package com.webank.wedatasphere.qualitis.controller.thymeleaf;

import com.alibaba.fastjson.JSONObject;
import org.apache.commons.httpclient.HttpStatus;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.http.ResponseEntity;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;
import org.springframework.web.client.RestTemplate;

import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import java.util.HashMap;

/**
 * 调用底层Linkis执行数据质量分析----调用Spark执行Shell-SQL
 */
@RestController
public class RestControllerLinkis {

    Logger logger = LoggerFactory.getLogger(RestControllerLinkis.class);

    // 注入RestTemplate实例
    @Autowired
    private RestTemplate restTemplate;

    @Value("${linkis.url:http://10.130.1.37:8188}")
    private String linkisUrl;


    private String sqlSimpleOne = "select count(*) from default.student where (name='zhangsan') and (sex is null)";
    private String sqlSimpleTwo = "select count(*) from default.student where (name='lisi') and (sex is null)";
    private String sqlSimpleThree = "select * from default.student where (name = 'zhangsan' or name = 'lisi') and (sex is null)";


    /**
     * （1）登录并执行任务
     *
     * @param httpServletRequest  httpServletRequest
     * @param httpServletResponse httpServletResponse
     * @return ResponseEntity
     * @throws Exception
     */
    @GetMapping("/executeSql")
    public ResponseEntity<JSONObject> loginAndExecuteSql(HttpServletRequest httpServletRequest, HttpServletResponse httpServletResponse) throws Exception {
        String sql = httpServletRequest.getParameter("sql");
        String executeSql = "";
        if (StringUtils.isNotBlank(sql)) {
            switch (sql) {
                case "one":
                    executeSql = sqlSimpleOne;
                    break;
                case "two":
                    executeSql = sqlSimpleTwo;
                    break;
                case "three":
                    executeSql = sqlSimpleThree;
                    break;
                default:
                    executeSql = sqlSimpleOne;
                    break;
            }
        }
        logger.error("============================================================");
        ResponseEntity<JSONObject> login = login(restTemplate);
        logger.info(login.getBody().toJSONString());
        logger.error("============================================================");
        ResponseEntity<JSONObject> responseEntity = executeSql(restTemplate, executeSql);
        Long taskID = responseEntity.getBody().getJSONObject("data").getLong("taskID");
        logger.error("========================TASKID====================================");
        logger.info(taskID.toString());
        return responseEntity;
    }

    /**
     * （3）获取执行结果：
     *
     * @param httpServletRequest
     * @param httpServletResponse
     * @return
     * @throws Exception
     */
    @GetMapping("/getResult")
    public ResponseEntity<JSONObject> getResult(HttpServletRequest httpServletRequest, HttpServletResponse httpServletResponse) throws Exception {
        // 获取任务的保存路径
        String resultLocation = getFIleURL(restTemplate, httpServletRequest.getParameter("taskID"));
        // 下载保存路径·下文件的类型
        String resUrl = linkisUrl + "/api/rest_j/v1/filesystem/openFile?path=" + resultLocation + "/_0.dolphin";
        ResponseEntity<JSONObject> resResp = restTemplate.getForEntity(resUrl, JSONObject.class);
        if (resResp != null && resResp.getStatusCode().value() == HttpStatus.SC_OK) {
            //do something
            JSONObject body = resResp.getBody();
            assert body != null;
            System.out.println(body.toJSONString());
        }
        return resResp;
    }

    /**
     * （4）获取执行日志：
     *
     * @param httpServletRequest
     * @param httpServletResponse
     * @return
     * @throws Exception
     */
    @GetMapping("/getExecuteLog")
    public ResponseEntity<JSONObject> getExecuteLog(HttpServletRequest httpServletRequest, HttpServletResponse httpServletResponse) throws Exception {
        // 获取任务的执行ID
        String execID = httpServletRequest.getParameter("execID");
        // 下获取执行日志
        String resUrl = linkisUrl + "/api/rest_j/v1/entrance/" + execID + "/log?fromLine=0&size=500";
        ResponseEntity<JSONObject> resResp = restTemplate.getForEntity(resUrl, JSONObject.class);
        if (resResp != null && resResp.getStatusCode().value() == HttpStatus.SC_OK) {
            //do something
            JSONObject body = resResp.getBody();
            assert body != null;
            System.out.println(body.toJSONString());
        }
        return resResp;
    }

    /**
     * （5）终止任务：
     *
     * @param httpServletRequest
     * @param httpServletResponse
     * @return
     * @throws Exception
     */
    @GetMapping("/killExecuteTask")
    public ResponseEntity<JSONObject> killExecuteTask(HttpServletRequest httpServletRequest, HttpServletResponse httpServletResponse) throws Exception {
        // 获取任务的执行ID
        String execID = httpServletRequest.getParameter("execID");
        // 终止任务
        String resUrl = linkisUrl + "/api/rest_j/v1/entrance/" + execID + "/kill";
        ResponseEntity<JSONObject> resResp = restTemplate.getForEntity(resUrl, JSONObject.class);
        if (resResp != null && resResp.getStatusCode().value() == HttpStatus.SC_OK) {
            //do something
            JSONObject body = resResp.getBody();
            assert body != null;
            System.out.println(body.toJSONString());
        }
        return resResp;
    }

    /**
     * （2）获取执行结果的的状态
     *
     * @param httpServletRequest
     * @param httpServletResponse
     * @return
     * @throws Exception
     */
    @GetMapping("/getStatus")
    public ResponseEntity<JSONObject> getStatus(HttpServletRequest httpServletRequest, HttpServletResponse httpServletResponse) throws Exception {
        String execID = httpServletRequest.getParameter("execID");
        String statusUrl = linkisUrl + "/api/rest_j/v1/entrance/" + execID + "/status";
        ResponseEntity<JSONObject> statusResp = restTemplate.getForEntity(statusUrl, JSONObject.class);
        if (statusResp != null && statusResp.getStatusCode().value() == HttpStatus.SC_OK) {
            String status;
            for (; ; ) {
                // 睡眠6秒钟
                Thread.sleep(6000);
                // 持续刷新状态
                statusResp = restTemplate.getForEntity(statusUrl, JSONObject.class);
                status = statusResp.getBody().getJSONObject("data").getString("status");
                //死循环查看任务状态，如果任务成功或者失败，则退出循环
                if ("Succeed".equals(status) || "Failed".equals(status)) {
                    break;
                }
            }
            if ("Succeed".equals(status)) {
                // do something
                logger.error("=========================================状态执行成功=========================================");
            }
        }
        return statusResp;
    }

    private ResponseEntity<JSONObject> login(RestTemplate restClient) {
        JSONObject postData = new JSONObject();
        postData.put("userName", "hadoop");
        postData.put("password", "f45b9a1af");
        String loginUrl = linkisUrl + "/api/rest_j/v1/user/login";
        return restClient.postForEntity(loginUrl, postData, JSONObject.class);
    }

    /**
     * @param restClient
     * @param sql        要执行的sql代码
     * @return
     */
    private ResponseEntity<JSONObject> executeSql(RestTemplate restClient, String sql) {
        String url = "/api/rest_j/v1/entrance/execute";
        JSONObject map = new JSONObject();
        map.put("method", url);
        map.put("params", new HashMap<>()); //用户指定的运行服务程序的参数,必填，里面的值可以为空
        map.put("executeApplicationName", "spark");//执行引擎，我用的hive
        map.put("executionCode", sql);
        map.put("runType", "sql");//当用户执行如spark服务时，可以选择python、R、SQL等,不能为空
        //因为我没有执行文件脚本，所以没有scriptPath参数
        String executeSql = linkisUrl + url;
        return restClient.postForEntity(executeSql, map, JSONObject.class);
    }


    /**
     * @param restClient
     * @param taskID     要执行的sql代码
     * @return
     */
    private String getFIleURL(RestTemplate restClient, String taskID) {
        String historyUrl = "/api/rest_j/v1/jobhistory/" + taskID + "/get";
        String executeSql = linkisUrl + historyUrl;
        ResponseEntity<JSONObject> hisResp = restClient.getForEntity(executeSql, JSONObject.class);
        String resultLocation = null;
        logger.info(hisResp.getBody().toJSONString());
        if (hisResp != null && hisResp.getStatusCode().value() == HttpStatus.SC_OK) {
            resultLocation = hisResp.getBody().getJSONObject("data").getJSONObject("task").getString("resultLocation");
        }
        return resultLocation;
    }


    /**
     * @param restClient
     * @param resultLocation 要执行的sql代码
     * @return
     */
    private String getRestFileMsg(RestTemplate restClient, String resultLocation) {
        String resUrl = "/api/rest_j/v1/filesystem/openFile?path=" + resultLocation + "/_0.dolphin";
        String executeSql = linkisUrl + resUrl;
        ResponseEntity<JSONObject> resResp = restClient.getForEntity(executeSql, JSONObject.class);
        if (resResp != null && resResp.getStatusCode().value() == HttpStatus.SC_OK) {
            //do something
            JSONObject body = resResp.getBody();
            logger.info(body.toJSONString());
        }
        logger.error("ending==========================");
        return resultLocation;
    }


}
